/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io;



import com.bff.gaia.unified.sdk.io.range.OffsetRange;

import com.bff.gaia.unified.sdk.io.range.OffsetRangeTracker;

import com.bff.gaia.unified.sdk.io.range.RangeTracker;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.NoSuchElementException;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/**

 * A {@link BoundedSource} that uses offsets to define starting and ending positions.

 *

 * <p>{@link OffsetBasedSource} is a common base class for all bounded sources where the input can

 * be represented as a single range, and an input can be efficiently processed in parallel by

 * splitting the range into a set of disjoint ranges whose union is the original range. This class

 * should be used for sources that can be cheaply read starting at any given offset. {@link

 * OffsetBasedSource} stores the range and implements splitting into bundles.

 *

 * <p>Extend {@link OffsetBasedSource} to implement your own offset-based custom source. {@link

 * FileBasedSource}, which is a subclass of this, adds additional functionality useful for custom

 * sources that are based on files. If possible implementors should start from {@link

 * FileBasedSource} instead of {@link OffsetBasedSource}.

 *

 * <p>Consult {@link RangeTracker} for important semantics common to all sources defined by a range

 * of positions of a certain type, including the semantics of split points ({@link

 * OffsetBasedReader#isAtSplitPoint}).

 *

 * @param <T> Type of records represented by the source.

 * @see BoundedSource

 * @see FileBasedSource

 * @see RangeTracker

 */

public abstract class OffsetBasedSource<T> extends BoundedSource<T> {

  private final long startOffset;

  private final long endOffset;

  private final long minBundleSize;



  /**

   * @param startOffset starting offset (inclusive) of the source. Must be non-negative.

   * @param endOffset ending offset (exclusive) of the source. Use {@link Long#MAX_VALUE} to

   *     indicate that the entire source after {@code startOffset} should be read. Must be {@code >

   *     startOffset}.

   * @param minBundleSize minimum bundle size in offset units that should be used when splitting the

   *     source into sub-sources. This value may not be respected if the total range of the source

   *     is smaller than the specified {@code minBundleSize}. Must be non-negative.

   */

  public OffsetBasedSource(long startOffset, long endOffset, long minBundleSize) {

    this.startOffset = startOffset;

    this.endOffset = endOffset;

    this.minBundleSize = minBundleSize;

  }



  /** Returns the starting offset of the source. */

  public long getStartOffset() {

    return startOffset;

  }



  /**

   * Returns the specified ending offset of the source. Any returned value greater than or equal to

   * {@link #getMaxEndOffset(PipelineOptions)} should be treated as {@link

   * #getMaxEndOffset(PipelineOptions)}.

   */

  public long getEndOffset() {

    return endOffset;

  }



  /**

   * Returns the minimum bundle size that should be used when splitting the source into sub-sources.

   * This value may not be respected if the total range of the source is smaller than the specified

   * {@code minBundleSize}.

   */

  public long getMinBundleSize() {

    return minBundleSize;

  }



  @Override

  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {

    long trueEndOffset = (endOffset == Long.MAX_VALUE) ? getMaxEndOffset(options) : endOffset;

    return getBytesPerOffset() * (trueEndOffset - getStartOffset());

  }



  @Override

  public List<? extends OffsetBasedSource<T>> split(

      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {

    // Split the range into bundles based on the desiredBundleSizeBytes. If the desired bundle

    // size is smaller than the minBundleSize of the source then minBundleSize will be used instead.



    long desiredBundleSizeOffsetUnits =

        Math.max(Math.max(1, desiredBundleSizeBytes / getBytesPerOffset()), minBundleSize);



    List<OffsetBasedSource<T>> subSources = new ArrayList<>();

    for (OffsetRange range :

        new OffsetRange(startOffset, Math.min(endOffset, getMaxEndOffset(options)))

            .split(desiredBundleSizeOffsetUnits, minBundleSize)) {

      subSources.add(createSourceForSubrange(range.getFrom(), range.getTo()));

    }

    return subSources;

  }



  @Override

  public void validate() {

    checkArgument(

        this.startOffset >= 0, "Start offset has value %s, must be non-negative", this.startOffset);

    checkArgument(

        this.endOffset >= 0, "End offset has value %s, must be non-negative", this.endOffset);

    checkArgument(

        this.startOffset <= this.endOffset,

        "Start offset %s may not be larger than end offset %s",

        this.startOffset,

        this.endOffset);

    checkArgument(

        this.minBundleSize >= 0,

        "minBundleSize has value %s, must be non-negative",

        this.minBundleSize);

  }



  @Override

  public String toString() {

    return "[" + startOffset + ", " + endOffset + ")";

  }



  /**

   * Returns approximately how many bytes of data correspond to a single offset in this source. Used

   * for translation between this source's range and methods defined in terms of bytes, such as

   * {@link #getEstimatedSizeBytes} and {@link #split}.

   *

   * <p>Defaults to {@code 1} byte, which is the common case for, e.g., file sources.

   */

  public long getBytesPerOffset() {

    return 1L;

  }



  /**

   * Returns the actual ending offset of the current source. The value returned by this function

   * will be used to clip the end of the range {@code [startOffset, endOffset)} such that the range

   * used is {@code [startOffset, min(endOffset, maxEndOffset))}.

   *

   * <p>As an example in which {@link OffsetBasedSource} is used to implement a file source, suppose

   * that this source was constructed with an {@code endOffset} of {@link Long#MAX_VALUE} to

   * indicate that a file should be read to the end. Then this function should determine the actual,

   * exact size of the file in bytes and return it.

   */

  public abstract long getMaxEndOffset(PipelineOptions options) throws Exception;



  /**

   * Returns an {@link OffsetBasedSource} for a subrange of the current source. The subrange {@code

   * [start, end)} must be within the range {@code [startOffset, endOffset)} of the current source,

   * i.e. {@code startOffset <= start < end <= endOffset}.

   */

  public abstract OffsetBasedSource<T> createSourceForSubrange(long start, long end);



  @Override

  public void populateDisplayData(DisplayData.Builder builder) {

    super.populateDisplayData(builder);

    builder

        .addIfNotDefault(

            DisplayData.item("minBundleSize", minBundleSize).withLabel("Minimum Bundle Size"), 1L)

        .addIfNotDefault(

            DisplayData.item("startOffset", startOffset).withLabel("Start Read Offset"), 0L)

        .addIfNotDefault(

            DisplayData.item("endOffset", endOffset).withLabel("End Read Offset"), Long.MAX_VALUE);

  }



  /**

   * A {@link Source.Reader} that implements code common to readers of all {@link

   * OffsetBasedSource}s.

   *

   * <p>Subclasses have to implement:

   *

   * <ul>

   *   <li>The methods {@link #startImpl} and {@link #advanceImpl} for reading the first or

   *       subsequent records.

   *   <li>The methods {@link #getCurrent}, {@link #getCurrentOffset}, and optionally {@link

   *       #isAtSplitPoint} and {@link #getCurrentTimestamp} to access properties of the last record

   *       successfully read by {@link #startImpl} or {@link #advanceImpl}.

   * </ul>

   */

  public abstract static class OffsetBasedReader<T> extends BoundedReader<T> {

    private static final Logger LOG = LoggerFactory.getLogger(OffsetBasedReader.class);

    private OffsetBasedSource<T> source;



    /** Returns true if the last call to {@link #start} or {@link #advance} returned false. */

    public final boolean isDone() {

      return rangeTracker.isDone();

    }



    /** Returns true if there has been a call to {@link #start}. */

    public final boolean isStarted() {

      return rangeTracker.isStarted();

    }



    /** The {@link OffsetRangeTracker} managing the range and current position of the source. */

    private final OffsetRangeTracker rangeTracker;



    /** @param source the {@link OffsetBasedSource} to be read by the current reader. */

    public OffsetBasedReader(OffsetBasedSource<T> source) {

      this.source = source;

      this.rangeTracker = new OffsetRangeTracker(source.getStartOffset(), source.getEndOffset());

    }



    /**

     * Returns the <i>starting</i> offset of the {@link Source.Reader#getCurrent current record},

     * which has been read by the last successful {@link Source.Reader#start} or {@link

     * Source.Reader#advance} call.

     *

     * <p>If no such call has been made yet, the return value is unspecified.

     *

     * <p>See {@link RangeTracker} for description of offset semantics.

     */

    protected abstract long getCurrentOffset() throws NoSuchElementException;



    /**

     * Returns whether the current record is at a split point (i.e., whether the current record

     * would be the first record to be read by a source with a specified start offset of {@link

     * #getCurrentOffset}).

     *

     * <p>See detailed documentation about split points in {@link RangeTracker}.

     */

    protected boolean isAtSplitPoint() throws NoSuchElementException {

      return true;

    }



    @Override

    public final boolean start() throws IOException {

      return (startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset()))

          || rangeTracker.markDone();

    }



    @Override

    public final boolean advance() throws IOException {

      return (advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset()))

          || rangeTracker.markDone();

    }



    /**

     * Initializes the {@link OffsetBasedSource.OffsetBasedReader} and advances to the first record,

     * returning {@code true} if there is a record available to be read. This method will be invoked

     * exactly once and may perform expensive setup operations that are needed to initialize the

     * reader.

     *

     * <p>This function is the {@code OffsetBasedReader} implementation of {@link

     * BoundedReader#start}. The key difference is that the implementor can ignore the possibility

     * that it should no longer produce the first record, either because it has exceeded the

     * original {@code endOffset} assigned to the reader, or because a concurrent call to {@link

     * #splitAtFraction} has changed the source to shrink the offset range being read.

     *

     * @see BoundedReader#start

     */

    protected abstract boolean startImpl() throws IOException;



    /**

     * Advances to the next record and returns {@code true}, or returns false if there is no next

     * record.

     *

     * <p>This function is the {@code OffsetBasedReader} implementation of {@link

     * BoundedReader#advance}. The key difference is that the implementor can ignore the possibility

     * that it should no longer produce the next record, either because it has exceeded the original

     * {@code endOffset} assigned to the reader, or because a concurrent call to {@link

     * #splitAtFraction} has changed the source to shrink the offset range being read.

     *

     * @see BoundedReader#advance

     */

    protected abstract boolean advanceImpl() throws IOException;



    @Override

    public synchronized OffsetBasedSource<T> getCurrentSource() {

      return source;

    }



    @Override

    public Double getFractionConsumed() {

      return rangeTracker.getFractionConsumed();

    }



    @Override

    public long getSplitPointsConsumed() {

      return rangeTracker.getSplitPointsProcessed();

    }



    @Override

    public long getSplitPointsRemaining() {

      if (isDone()) {

        return 0;

      } else if (!isStarted()) {

        // Note that even if the current source does not allow splitting, we don't know that

        // it's non-empty so we return UNKNOWN instead of 1.

        return BoundedReader.SPLIT_POINTS_UNKNOWN;

      } else if (!allowsDynamicSplitting()) {

        // Started (so non-empty) and unsplittable, so only the current task.

        return 1;

      } else if (getCurrentOffset() >= rangeTracker.getStopPosition() - 1) {

        // If this is true, the next element is outside the range. Note that even getCurrentOffset()

        // might be larger than the stop position when the current record is not a split point.

        return 1;

      } else {

        // Use the default.

        return super.getSplitPointsRemaining();

      }

    }



    /**

     * Whether this reader should allow dynamic splitting of the offset ranges.

     *

     * <p>True by default. Override this to return false if the reader cannot support dynamic

     * splitting correctly. If this returns false, {@link OffsetBasedReader#splitAtFraction} will

     * refuse all split requests.

     */

    public boolean allowsDynamicSplitting() {

      return true;

    }



    @Override

    public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) {

      if (!allowsDynamicSplitting()) {

        return null;

      }

      if (rangeTracker.getStopPosition() == Long.MAX_VALUE) {

        LOG.debug(

            "Refusing to split unbounded OffsetBasedReader {} at fraction {}",

            rangeTracker,

            fraction);

        return null;

      }

      long splitOffset = rangeTracker.getPositionForFractionConsumed(fraction);

      LOG.debug(

          "Proposing to split OffsetBasedReader {} at fraction {} (offset {})",

          rangeTracker,

          fraction,

          splitOffset);

      long start = source.getStartOffset();

      long end = source.getEndOffset();

      OffsetBasedSource<T> primary = source.createSourceForSubrange(start, splitOffset);

      OffsetBasedSource<T> residual = source.createSourceForSubrange(splitOffset, end);

      if (!rangeTracker.trySplitAtPosition(splitOffset)) {

        return null;

      }

      this.source = primary;

      return residual;

    }

  }

}